﻿using Actor.Net.Network.Gate.Adapter;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Actor.Net.Network.Gate.WebSocket
{
    public class GateWebSocketChannel : GateBaseChannel
    {
        System.Net.WebSockets.WebSocket socket;

        private byte[] receiveBuffer { get; set; } = new byte[GatePacketParser.BufferLength];
        private ArrayBuffer sendBuffer { get; } = new ArrayBuffer();

        private volatile bool isStartSend = false;
        private object locker = new object();

        public GateWebSocketChannel(System.Net.WebSockets.WebSocket socket, GateBaseService netService, int channelId, EndPoint localEndPoint, EndPoint remoteEndPoint) : base(netService, channelId)
        {
            this.socket = socket;
            this.LocalEndPoint = localEndPoint;
            this.RemoteEndPoint = remoteEndPoint;
            this.Read();
        }

        public GateWebSocketChannel(GateBaseService netService, int channelId) : base(netService, channelId)
        {
            this.RemoteEndPoint = this.Network.ConnectEndPoint;
        }

        public override bool Connected => this.IsConnected();

        private bool IsConnected()
        {
            return this.socket == null ? false : this.socket.State == WebSocketState.Open;
        }

        public override bool Connect(int timeoutMillisecond)
        {
            return false;
        }

        public override async Task<bool> ConnectAsync(CancellationToken cancellationToken)
        {
            try
            {
                if (this.Connected)
                    return true;

                if (this.socket != null && this.socket.State == WebSocketState.Connecting)
                    return false;

                this.socket = new ClientWebSocket();
                await (this.socket as ClientWebSocket).ConnectAsync(new Uri((this.NetService as GateWebSocketService).HttpPrefixed), cancellationToken);
                this.NetService.ProcessConnected(this.ChannelContext);
                this.Read();
                return true;

            }
            catch (Exception ex)
            {
                this.NetService.ProcessSocketError(this.ChannelContext, ex);
                this.Reconnect();
                return false;
            }
        }

        public override void Write(byte[] bytes, int count)
        {
            if (bytes == null)
                return;

            if (count < GatePacketParser.HeadSize)
                throw new SocketException((int)SocketError.InvalidArgument);

            sendBuffer.Write(bytes, 0, count);

            //如果发送队列大于32立即发送数据
            if (sendBuffer.QueueCount > 32)
                StartSend();
        }

        private bool IsCanSend()
        {
            lock (this.locker)
            {
                if (this.isStartSend)
                    return false;

                this.isStartSend = true;
                return true;
            }
        }

        private void SetCanSend()
        {
            lock (this.locker)
            {
                if (this.isStartSend)
                    this.isStartSend = false;
            }
        }

        public override async void StartSend()
        {
            if (!this.Connected)
                return;

            if (!this.IsCanSend())
                return;

            try
            {
                var bytes = sendBuffer.Get(out int count);
                if (count == 0)
                {
                    this.SetCanSend();
                    return;
                }
                var segment = new ArraySegment<byte>(bytes, 0, count);
                try
                {
                    await this.socket.SendAsync(segment, WebSocketMessageType.Binary, true, CancellationToken.None);
                }
                catch (Exception ex)
                {
                    this.NetService.ProcessSocketError(this.ChannelContext, ex);
                    this.Disconnect();
                }
            }
            catch (Exception ex)
            {
                this.NetService.ProcessSocketError(this.ChannelContext, ex);
                this.Reconnect();
            }
            finally
            {
                this.SetCanSend();
                this.StartSend();
            }
        }

        public override async void Read()
        {
            while (true)
            {
                if (!this.Connected)
                    return;

                try
                {
                    var segment = new ArraySegment<byte>(this.receiveBuffer, 0, this.receiveBuffer.Length);
                    var recvResult = await this.socket.ReceiveAsync(segment, CancellationToken.None);

                    if (recvResult.Count == 0)
                    {
                        this.Disconnect();
                        return;
                    }

                    if (recvResult.MessageType == WebSocketMessageType.Close)
                    {
                        this.Disconnect();
                        return;
                    }

                    this.LastReceiveMillisecond = GateTimeUtils.NowMilliSecond();
                    int count = recvResult.Count;
                    int offset = 0;
                    while (true)
                    {
                        if (count == 0)
                            break;

                        try
                        {
                            bool parseOk = this.Parser.Parse(this.receiveBuffer, ref offset, ref count);
                            if (parseOk)
                            {
                                GateTransferContext context = this.Parser.GetTransferContext(this.ChannelContext);
                                if (context.RpcId > 0)
                                {
                                    if (this.CallContextList.TryRemove(context.RpcId, out IGateCallContext callContext))
                                        callContext.SetDeserializationResult(context);
                                }
                                else
                                {
                                    try
                                    {
                                        IGateMessageAdapter adapter = this.Network.GetAdapter(context.Command);
                                        adapter?.DispatchAdapter(context);
                                    }
                                    catch (Exception ex)
                                    {
                                        this.NetService.ProcessSocketError(this.ChannelContext, ex);
                                    }
                                }
                                this.Parser.Flush();
                            }
                        }
                        catch (Exception ex)
                        {
                            this.NetService.ProcessSocketError(this.ChannelContext, ex);
                            this.Disconnect();
                            return;
                        }
                    }
                }
                catch (Exception ex)
                {
                    this.NetService.ProcessSocketError(this.ChannelContext, ex);
                    this.Disconnect();
                    return;
                }
            }
        }

        public override void Disconnect()
        {
            try
            {
                this.socket.Abort();
                this.socket.Dispose();
            }
            catch (Exception ex)
            {
                this.NetService.ProcessSocketError(this.ChannelContext, ex);
                return;
            }

            this.Parser.Flush();
            foreach (var callContext in this.CallContextList.Values)
                callContext.SetDefaultResult();

            this.CallContextList.Clear();
            this.NetService.ProcessDisconnected(this.ChannelContext);

            this.Dispose();
        }

        public override void Dispose()
        {
            this.sendBuffer.Dispose();
        }
    }
}
